{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-08T05:43:22.447471Z",
     "start_time": "2019-04-08T05:43:21.160712Z"
    }
   },
   "outputs": [],
   "source": [
    "import pandas as pd \n",
    "import gc\n",
    "import os \n",
    "import warnings\n",
    "import numpy as np \n",
    "import time\n",
    "import re\n",
    "from sklearn.preprocessing import OneHotEncoder,LabelEncoder\n",
    "from contextlib import contextmanager\n",
    "from pyspark.sql.functions import  udf\n",
    "from pyspark.sql.types import  IntegerType\n",
    "from pyspark.sql.types import  DoubleType\n",
    "from pyspark.sql.functions import  *\n",
    "warnings.filterwarnings('ignore')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-02T04:42:37.252133Z",
     "start_time": "2019-04-02T04:42:31.113779Z"
    }
   },
   "outputs": [],
   "source": [
    "# from pyspark import SparkService\n",
    "# # # 创建local模式的SparkSession\n",
    "# spark = SparkService.get_local_spark(executor_instances=3, driver_mem='10g')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-08T05:51:07.497538Z",
     "start_time": "2019-04-08T05:50:37.843328Z"
    }
   },
   "outputs": [],
   "source": [
    "from pyspark import SparkService\n",
    "# 创建spark会话，自定义资源\n",
    "# spark = SparkService.get_spark(executor_instances=1, per_executor_mem='1g', driver_mem='1g')\n",
    "# 创建spark会话，使用默认资源\n",
    "spark = SparkService.get_spark(executor_instances=4,per_executor_mem='4g',driver_mem='2g')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-08T05:51:08.595167Z",
     "start_time": "2019-04-08T05:51:07.501162Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------------+\n",
      "|    databaseName|\n",
      "+----------------+\n",
      "|        aip_demo|\n",
      "|         aipdemo|\n",
      "| chinese_segment|\n",
      "|        customer|\n",
      "|       dataworks|\n",
      "|         default|\n",
      "|   email_message|\n",
      "|     outertables|\n",
      "|      qx_testing|\n",
      "|       sitevisit|\n",
      "|   sitevisit_dev|\n",
      "|suspect_phonenum|\n",
      "+----------------+\n",
      "\n",
      "++\n",
      "||\n",
      "++\n",
      "++\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>database</th>\n",
       "      <th>tableName</th>\n",
       "      <th>isTemporary</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>apma</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>app_events</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>app_labels</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>breastw</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>credit_bureau</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>5</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>credit_credit_card_balanced</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>6</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>credit_pos_cash_balanced</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>7</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>credit_previous_application</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>8</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>credit_test</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>9</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>credit_train</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>10</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>emnist_test</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>11</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>emnist_train</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>12</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>events</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>13</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>flw_dataset</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>14</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>gender_age_train</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>15</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>gender_age_train_all</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>16</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>give_me_credit</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>17</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>home_credit_application_test</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>18</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>home_credit_application_train</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>19</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>home_credit_bureau</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>20</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>home_credit_bureau_balance</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>21</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>home_credit_credit_card_balance</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>22</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>home_credit_feature</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>23</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>home_credit_installments_payments</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>24</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>home_credit_pos_cash_balance</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>25</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>home_credit_previous_application</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>26</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>label_categories</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>27</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>mnist_test</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>28</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>mnist_train</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>29</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>phone_brand_device_model</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>30</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>purchase_redemption_mfd_bank_shibor</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>31</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>purchase_redemption_mfd_day_share_interest</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>32</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>purchase_redemption_user_balance</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>33</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>purchase_redemption_user_profile</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>34</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>reddit</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>35</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>stmt</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>36</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>taxi_trip_duration</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>37</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>uci_credit_card_data</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>38</th>\n",
       "      <td>qx_testing</td>\n",
       "      <td>web_google</td>\n",
       "      <td>False</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "      database                                   tableName  isTemporary\n",
       "0   qx_testing                                        apma        False\n",
       "1   qx_testing                                  app_events        False\n",
       "2   qx_testing                                  app_labels        False\n",
       "3   qx_testing                                     breastw        False\n",
       "4   qx_testing                               credit_bureau        False\n",
       "5   qx_testing                 credit_credit_card_balanced        False\n",
       "6   qx_testing                    credit_pos_cash_balanced        False\n",
       "7   qx_testing                 credit_previous_application        False\n",
       "8   qx_testing                                 credit_test        False\n",
       "9   qx_testing                                credit_train        False\n",
       "10  qx_testing                                 emnist_test        False\n",
       "11  qx_testing                                emnist_train        False\n",
       "12  qx_testing                                      events        False\n",
       "13  qx_testing                                 flw_dataset        False\n",
       "14  qx_testing                            gender_age_train        False\n",
       "15  qx_testing                        gender_age_train_all        False\n",
       "16  qx_testing                              give_me_credit        False\n",
       "17  qx_testing                home_credit_application_test        False\n",
       "18  qx_testing               home_credit_application_train        False\n",
       "19  qx_testing                          home_credit_bureau        False\n",
       "20  qx_testing                  home_credit_bureau_balance        False\n",
       "21  qx_testing             home_credit_credit_card_balance        False\n",
       "22  qx_testing                         home_credit_feature        False\n",
       "23  qx_testing           home_credit_installments_payments        False\n",
       "24  qx_testing                home_credit_pos_cash_balance        False\n",
       "25  qx_testing            home_credit_previous_application        False\n",
       "26  qx_testing                            label_categories        False\n",
       "27  qx_testing                                  mnist_test        False\n",
       "28  qx_testing                                 mnist_train        False\n",
       "29  qx_testing                    phone_brand_device_model        False\n",
       "30  qx_testing         purchase_redemption_mfd_bank_shibor        False\n",
       "31  qx_testing  purchase_redemption_mfd_day_share_interest        False\n",
       "32  qx_testing            purchase_redemption_user_balance        False\n",
       "33  qx_testing            purchase_redemption_user_profile        False\n",
       "34  qx_testing                                      reddit        False\n",
       "35  qx_testing                                        stmt        False\n",
       "36  qx_testing                          taxi_trip_duration        False\n",
       "37  qx_testing                        uci_credit_card_data        False\n",
       "38  qx_testing                                  web_google        False"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# # the spark short for SparkSession and sc short for SparkContext have already declared\n",
    "spark.sql(\"show databases\").show()\n",
    "spark.sql(\"use qx_testing\").show()\n",
    "spark.sql(\"show tables\").toPandas()\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 重新刷一遍表"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-04T09:45:24.616278Z",
     "start_time": "2019-04-04T09:45:24.607845Z"
    }
   },
   "outputs": [],
   "source": [
    "paths='/opt/notebook/xuyinghao'\n",
    "\n",
    "def createtTable(paths):\n",
    "    for file in os.listdir():\n",
    "        col_list =[]\n",
    "        if 'csv' in file:\n",
    "            print('table %s 正在导入'%(file.replace('.csv','')))\n",
    "            start = time.time()\n",
    "            dir_ = os.path.join(paths,file)\n",
    "            df = pd.read_csv(dir_)\n",
    "            df.replace(np.NAN,'',inplace=True)\n",
    "            for col in df.columns:\n",
    "                col_list.append(col.lower())\n",
    "            df.columns = col_list\n",
    "            df[col_list]=df[col_list].astype(str)\n",
    "            spark_table = spark.createDataFrame(df,verifySchema=False)\n",
    "            del df,col_list\n",
    "            gc.collect()\n",
    "            spark_table.write.saveAsTable(name='qx_testing.home_credit_%s'%(file.replace('.csv','')),mode='overwrite',partitionBy=None)\n",
    "            end = time.time()\n",
    "            print('table %s 导入成功!'%(file.replace('.csv','')))\n",
    "            print('导入开销%f s'%(end-start))\n",
    " \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-04T09:45:26.104566Z",
     "start_time": "2019-04-04T09:45:26.101706Z"
    }
   },
   "outputs": [],
   "source": [
    "# createtTable(paths)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 定义一些辅助函数"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-08T05:50:31.676383Z",
     "start_time": "2019-04-08T05:50:31.664677Z"
    }
   },
   "outputs": [],
   "source": [
    "\n",
    "def change_age_tobin(days_birth):\n",
    "    \n",
    "    x = - days_birth/365\n",
    "    if x < 20: return 1\n",
    "    elif x < 30: return 2\n",
    "    elif x < 40: return 3\n",
    "    elif x < 50: return 4\n",
    "    elif x < 60: return 5\n",
    "    else: \n",
    "        return 0\n",
    "    \n",
    "change_age_tobin = udf(change_age_tobin,returnType=IntegerType())\n",
    "    \n",
    "def cal_mean(df, group_cols, col, agg_name):\n",
    "    \"\"\"\n",
    "    计算均值\n",
    "    \"\"\"\n",
    "    gp = df.select(group_cols+[col]).groupBy(group_cols).agg(mean(col).alias(agg_name))\n",
    "    df = df.join(gp,on=group_cols,how='left')\n",
    "    del gp;gc.collect()\n",
    "    return df\n",
    "    \n",
    "\n",
    "\n",
    "def cal_std(df, group_cols, col, agg_name):\n",
    "    \"\"\"\n",
    "    计算标准差\n",
    "    \"\"\"\n",
    "    gp = df[group_cols + [col]].groupby(group_cols).agg(stddev(col).alias(agg_name))\n",
    "    df = df.join(gp, on=group_cols, how='left')\n",
    "    del gp\n",
    "    gc.collect()\n",
    "    return df\n",
    "\n",
    "def onehot_label_encoder(df):\n",
    "    \"\"\"\n",
    "    将object类型的变量进行编码，小于等于2的进行标签编码，大于2的onehot编码,\n",
    "    同时对于高基数类别型特征可以使用平均数编码\n",
    "    \"\"\"\n",
    "    le = LabelEncoder()\n",
    "\n",
    "    for col in df.columns:\n",
    "        if df[col].dtype=='object'and len(df[col].unique())<=2:\n",
    "            le.fit(df[col])\n",
    "            df[col] = le.transform(df[col])\n",
    "        else:\n",
    "            if df[col].dtype=='object'and len(df[col].unique())>2:\n",
    "                df = pd.get_dummies(df,columns=[col])\n",
    "    return df\n",
    "def read_table(file_or_path):\n",
    "    df = pd.read_csv(file_or_path)\n",
    "    col_list = []\n",
    "    for col in df.columns:\n",
    "        col_list.append(col.lower())\n",
    "    df.columns = col_list\n",
    "    return df \n",
    "\n",
    "\n",
    "def group(df_to_agg, prefix, aggregations, aggregate_by= 'sk_id_curr'):\n",
    "    \"\"\"\n",
    "     对每个表按照主键groupby\n",
    "    \"\"\"\n",
    "    agg_df = df_to_agg.groupby(aggregate_by).agg(aggregations)\n",
    "    df_col =agg_df.columns\n",
    "    rename_col =[]\n",
    "    for col in df_col:\n",
    "        tmp_col = re.split(\"\\(|\\)\",col)\n",
    "        tmp_col.remove('')\n",
    "        tmp_col = tmp_col[::-1]\n",
    "        rename_col.append(prefix+'_'.join(tmp_col))\n",
    "    #映射后的列名\n",
    "    mapping = dict(zip(df_col,rename_col))\n",
    "  \n",
    "    agg_df = agg_df.select([col(c).alias(mapping.get(c, c)) for c in df_col])\n",
    "    \n",
    "    return agg_df\n",
    "\n",
    "    \n",
    "\n",
    "\n",
    "@contextmanager\n",
    "def timer(name):\n",
    "    t0 = time.time()\n",
    "    yield\n",
    "    print(\"{} - done in {:.0f}s\".format(name, time.time() - t0))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 处理主表"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-04T09:45:29.128751Z",
     "start_time": "2019-04-04T09:45:29.115124Z"
    }
   },
   "outputs": [],
   "source": [
    "def process_train_test_spark():\n",
    "    df = spark.sql('select * from qx_testing.home_credit_application_train')\n",
    "    \n",
    "    #过滤掉异常样本以及用NAN替换掉异常值\n",
    "    df = df.filter(df['code_gender']!='XNA').filter(df['amt_income_total']<20000000)\n",
    "    df = df.replace(365243,np.nan,subset=['days_employed'])\n",
    "    df = df.replace(0,np.nan,subset=['days_last_phone_change'])\n",
    "\n",
    "    docs = [x for x in df.columns if 'flag_doc' in x]\n",
    "    sum_string = udf(lambda arr:sum(arr),IntegerType())\n",
    "    #将有flag_doc的列进行求和\n",
    "    df = df.withColumn('document_count',sum_string(array(docs)))\n",
    "    for col in docs:\n",
    "        df = df.drop(col)\n",
    "    \n",
    "    df = df.withColumn('ext_sources_prod',df['ext_source_1']+df['ext_source_2']+df['ext_source_3'])\n",
    "    df = df.withColumn('ext_source_weighted',df['ext_source_1']*2+df['ext_source_2']*1+df['ext_source_3']*3)\n",
    "    df = df.withColumn('age_bin',change_age_tobin('days_birth'))\n",
    "    \n",
    "    \n",
    "    #构建一些比例特征\n",
    "    df = df.withColumn('credit_to_annuity_ratio',df['amt_credit'] / df['amt_annuity'])\n",
    "    df = df.withColumn('credit_to_goods_ratio',df['amt_credit'] / df['amt_goods_price'])\n",
    "    # 收入类型比列\n",
    "    \n",
    "    df = df.withColumn('annuity_to_income_ratio',df['amt_annuity'] / df['amt_income_total'])\n",
    "    df = df.withColumn('credit_to_income_ratio',df['amt_credit'] / df['amt_income_total'])\n",
    "    df = df.withColumn('income_to_employed_ratio',df['amt_income_total'] / df['days_employed'])\n",
    "    df = df.withColumn('income_to_birth_ratio',df['amt_income_total'] / df['days_birth'])\n",
    "    # 时间序列形式比列特征\n",
    "    \n",
    "    df = df.withColumn('employed_to_birth_ratio',df['days_employed'] / df['days_birth'])\n",
    "    df = df.withColumn('car_to_birth_ratio',df['own_car_age'] / df['days_birth'])\n",
    "    df = df.withColumn('car_to_employed_ratio',df['own_car_age'] / df['days_employed'])\n",
    "    \n",
    "    #统计特征\n",
    "    #重点关注EXT_SOURCE_1，EXT_SOURCE_2，EXT_SOURCE_3三个字段\n",
    "    df= df.withColumn('ext_sources_min',least('ext_source_1','ext_source_2','ext_source_3'))\n",
    "    df= df.withColumn('ext_sources_max',greatest('ext_source_1','ext_source_2','ext_source_3'))\n",
    "    df = df.withColumn('ext_sources_mean',(df['ext_source_1']+df['ext_source_2']+df['ext_source_3'])/3)\n",
    "    \n",
    " \n",
    "    group_col = ['organization_type', 'name_education_type', 'occupation_type', 'age_bin', 'code_gender']\n",
    "  \n",
    "    #根据group_col来计算分组后的ext_source_median\n",
    "    df=cal_mean(df,group_col,'ext_sources_mean','group_ext_sources_median')\n",
    "    df =cal_std(df,group_col,'ext_sources_mean','group_ext_sources_std')\n",
    "#     #计算分组后收入的平均值\n",
    "    df = cal_mean(df,group_col,'amt_income_total','group_income_mean')\n",
    "    df = cal_std(df,group_col,'amt_income_total','group_income_std')\n",
    "    #计算分组申请贷款的金额\n",
    "    df = cal_mean(df,group_col,'amt_credit','group_credit_mean')\n",
    "    df = cal_std(df,group_col,'amt_credit','group_credit_std')\n",
    "    df = cal_mean(df,group_col,'amt_annuity','group_annuity_mean')\n",
    "    df = cal_std(df,group_col,'amt_annuity','group_annuity_std')\n",
    "    \n",
    "#     #变量编码\n",
    "#     df = onehot_label_encoder(df)\n",
    "    \n",
    "    return df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 衍生bureau以及bureau_balance表特征"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-08T05:54:56.780360Z",
     "start_time": "2019-04-08T05:54:56.765254Z"
    }
   },
   "outputs": [],
   "source": [
    "def process_bureau_and_balance():\n",
    "    df_bureau = spark.sql('select * from qx_testing.home_credit_bureau')\n",
    "    df_balance = spark.sql('select * from qx_testing.home_credit_bureau_balance')\n",
    "    df = df_bureau.join(df_balance,on='sk_id_bureau',how='left')\n",
    "    del df_bureau,df_balance;gc.collect()\n",
    "    \n",
    "    ##衍生一些比例特征\n",
    "    df = df.withColumn('credit_sum_overdue_ratio',df['amt_credit_sum_overdue']/df['amt_credit_sum'])\n",
    "    df = df.withColumn('debt_percentage',df['amt_credit_sum_debt']/df['amt_credit_sum'])\n",
    "    df = df.withColumn('credit_to_annuity_ratio',df['amt_credit_sum']/df['amt_annuity'])\n",
    " \n",
    "    \n",
    "    #衍生一些统计特征\n",
    "    group_col=['sk_id_bureau']\n",
    "    df = cal_mean(df,group_col,'amt_credit_sum_debt','group_sum_debt_mean')\n",
    "    df= cal_std(df,group_col,'amt_credit_sum_debt','group_sum_debt_std')\n",
    "    df = cal_mean(df,group_col,'amt_credit_sum_overdue','group_sum_overdue_mean')\n",
    "    df = cal_std(df,group_col,'amt_credit_sum_overdue','group_sum_overdue_std')\n",
    "    \n",
    "    #类别型字段编码\n",
    "    bureau_agg ={\n",
    "#     'sk_id_bureau': ['nunique'],\n",
    "    'days_credit': ['min', 'max', 'mean'],\n",
    "    'days_credit_enddate': ['min', 'max'],\n",
    "    'amt_credit_max_overdue': ['max', 'mean'],\n",
    "    'amt_credit_sum': ['max', 'mean', 'sum'],\n",
    "    'amt_credit_sum_debt': ['max', 'mean', 'sum'],\n",
    "    'amt_credit_sum_overdue': ['max', 'mean', 'sum'],\n",
    "    'amt_annuity': ['mean'],\n",
    "    'amt_credit_sum_debt':['mean'],\n",
    "    'amt_credit_sum_overdue':['mean'],\n",
    "    # 类别型特征\n",
    "#     'status_0': ['mean'],\n",
    "#     'status_1': ['mean'],\n",
    "#     'status_2': ['mean'],\n",
    "#     'status_C': ['mean'],\n",
    "#     'status_X': ['mean'],\n",
    "#     'credit_active_Active': ['mean'],\n",
    "#     'credit_active_Closed': ['mean'],\n",
    "#     'credit_active_Sold': ['mean'],\n",
    "#     'credit_type_Mortgage': ['mean'],\n",
    "#     'credit_type_Microloan': ['mean']\n",
    "}\n",
    "    #聚合特征\n",
    "#     df = group(df,prefix='bureau_',aggregations=bureau_agg)\n",
    "\n",
    " \n",
    "    return df\n",
    "   \n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-08T05:54:57.920021Z",
     "start_time": "2019-04-08T05:54:57.150345Z"
    }
   },
   "outputs": [],
   "source": [
    "df = process_bureau_and_balance()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-08T06:03:19.913551Z",
     "start_time": "2019-04-08T06:03:19.882434Z"
    }
   },
   "outputs": [
    {
     "ename": "AssertionError",
     "evalue": "all exprs should be Column",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mAssertionError\u001b[0m                            Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-18-6cfaaa1bde3e>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mtmp\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdf\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgroupBy\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'sk_id_curr'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0magg\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m{\u001b[0m\u001b[0;34m'days_credit'\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0mcol\u001b[0m\u001b[0;34m}\u001b[0m  \u001b[0;32mfor\u001b[0m \u001b[0mcol\u001b[0m \u001b[0;32min\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m'min'\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m'max'\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m'mean'\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
      "\u001b[0;32m/opt/cloudera/parcels/AIP/lib/aip/spark-2.2.0/python/pyspark/sql/group.py\u001b[0m in \u001b[0;36magg\u001b[0;34m(self, *exprs)\u001b[0m\n\u001b[1;32m     87\u001b[0m         \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     88\u001b[0m             \u001b[0;31m# Columns\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 89\u001b[0;31m             \u001b[0;32massert\u001b[0m \u001b[0mall\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mColumn\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mc\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mexprs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"all exprs should be Column\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     90\u001b[0m             jdf = self._jgd.agg(exprs[0]._jc,\n\u001b[1;32m     91\u001b[0m                                 _to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]]))\n",
      "\u001b[0;31mAssertionError\u001b[0m: all exprs should be Column"
     ]
    }
   ],
   "source": [
    "tmp = df.groupBy('sk_id_curr').agg({'days_credit':col})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-08T06:02:17.917351Z",
     "start_time": "2019-04-08T06:02:17.912717Z"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['sk_id_curr', 'avg(days_credit)']"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "tmp.columns"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 衍生previous_applicaton表特征"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-04T10:00:04.377811Z",
     "start_time": "2019-04-04T10:00:04.369720Z"
    }
   },
   "outputs": [],
   "source": [
    "def process_previous_application():\n",
    "    df = spark.sql('select * from qx_testing.home_credit_previous_application')\n",
    "    #衍生一些比例特征以及差值特征\n",
    "    df = df.withColumn('application_credit_diff',df['amt_application'] - df['amt_credit'])\n",
    "    df = df.withColumn('application_to_credit_ratio',df['amt_application']/df['amt_credit'])\n",
    "    df = df.withColumn('credit_to_annuity_ratio',df['amt_credit']/df['amt_annuity'])\n",
    "    \n",
    "    group_col = ['name_client_type','name_contract_status','name_contract_type','name_cash_loan_purpose','code_reject_reason']\n",
    "    \n",
    "    df = cal_mean(df,group_col,'amt_annuity','group_annuity_mean')\n",
    "    df = cal_std(df,group_col,'amt_annuity','group_annuity_std')\n",
    "    df = cal_mean(df,group_col,'amt_credit','group_amt_credity_mean')\n",
    "    df = cal_std(df,group_col,'amt_credit','group_amt_credit_std')\n",
    "    df = cal_mean(df,group_col,'amt_application','group_amt_application_mean')\n",
    "    df = cal_std(df,group_col,'amt_application','group_amt_application_std')\n",
    "    \n",
    "    #将365243替换为nan\n",
    "    subset = ['days_first_drawing','days_first_due','days_last_due_1st_version','days_last_due','days_termination']\n",
    "    df = df.replace(365243,np.nan,subset=subset)\n",
    "   \n",
    "    \n",
    "    return df\n",
    "    \n",
    "    \n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 衍生 pos_cash表特征"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-04T10:04:04.142009Z",
     "start_time": "2019-04-04T10:04:04.135825Z"
    }
   },
   "outputs": [],
   "source": [
    "def process_pos_cash_balance():\n",
    "    df = spark.sql('select * from qx_testing.home_credit_pos_cash_balance')\n",
    "    change_to_binary = udf(lambda col:1 if col >0 else 0 ,returnType=IntegerType())\n",
    "    df = df.withColumn('late_payment',change_to_binary('sk_dpd'))\n",
    "    df = df.withColumn('sk_dpd_diff',df['sk_dpd']-df['sk_dpd_def'])\n",
    "    df = df.withColumn('instalment_diff',df['cnt_instalment'] - df['cnt_instalment_future'])\n",
    "    \n",
    "    df = cal_mean(df,['sk_id_curr'],'cnt_instalment','group_cnt_instalment_mean')\n",
    "    df = cal_std(df,['sk_id_curr'],'cnt_instalment','group_cnt_instalment_std')\n",
    "    df = cal_mean(df,['sk_id_curr'],'cnt_instalment_future','group_cnt_instalment_future_mean')\n",
    "    df = cal_std(df,['sk_id_curr'],'cnt_instalment_future','group_cnt_instalment_future_std')\n",
    "    \n",
    "    #编码\n",
    "    return df\n",
    "  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 衍生 installments_payment表特征\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-04T10:17:27.918881Z",
     "start_time": "2019-04-04T10:17:27.913464Z"
    }
   },
   "outputs": [],
   "source": [
    "def process_payment():\n",
    "    df = spark.sql('select * from qx_testing.home_credit_installments_payments')\n",
    "    \n",
    "    df = df.withColumn('days_payment_diff',df['days_instalment']-df['days_entry_payment'])\n",
    "    \n",
    "    def change_to_binary(col1,col2):\n",
    "        if col1 == None and col2==None:\n",
    "            return None\n",
    "        elif col1==None:\n",
    "            return 0\n",
    "        elif col2 ==None:\n",
    "            return 1\n",
    "        elif col1-col2>0:\n",
    "            return 1\n",
    "        else: return 0\n",
    " \n",
    "    \n",
    "    change_to_binary = udf(change_to_binary,IntegerType())\n",
    "    \n",
    "    df = df.withColumn('flag_pay_more',change_to_binary('amt_payment','amt_instalment'))\n",
    "\n",
    "    df = cal_mean(df,['sk_id_curr'],'days_instalment','group_instalment_mean')\n",
    "    df = cal_std(df,['sk_id_curr'],'days_instalment','group_instalment_std')\n",
    "    df = cal_mean(df,['sk_id_curr'],'amt_instalment','group_amt_instalment_mean')\n",
    "    df = cal_std(df,['sk_id_curr'],'amt_instalment','group_amt_instalment_std')\n",
    "    \n",
    "    \n",
    "    return df\n",
    "\n",
    "\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-04T10:17:29.285664Z",
     "start_time": "2019-04-04T10:17:28.822856Z"
    }
   },
   "outputs": [],
   "source": [
    "df = process_payment()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 衍生 credit_card表特征"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-04T11:08:09.824974Z",
     "start_time": "2019-04-04T11:08:09.820628Z"
    }
   },
   "outputs": [],
   "source": [
    "def process_credit_card():\n",
    "    df = spark.sql('select * from qx_testing.home_credit_credit_card_balance')\n",
    "    df = df.withColumn('limit_use',df['amt_balance']/df['amt_credit_limit_actual'])\n",
    "    df = df.withColumn('payment_div_min',df['amt_payment_current'] - df['amt_inst_min_regularity'])\n",
    "    change_to_binary= udf(lambda col:1 if col >0 else 0)\n",
    "    df = df.withColumn('late_payment',change_to_binary('sk_dpd'))\n",
    "    \n",
    "    df= df.withColumn('drawing_limit_ratio',df['amt_drawings_atm_current']/df['amt_credit_limit_actual'])\n",
    "    \n",
    "    return df\n",
    "\n",
    "\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "ExecuteTime": {
     "end_time": "2019-04-04T11:13:44.305011Z",
     "start_time": "2019-04-04T11:13:44.294674Z"
    }
   },
   "outputs": [],
   "source": [
    "def main():\n",
    "    with timer(\"application_train and application_test\"):\n",
    "        df = process_train_test()\n",
    "        print(\"Application dataframe shape: \", df.shape)\n",
    "    with timer(\"Bureau and bureau_balance data\"):\n",
    "        bureau_df = process_bureau_and_balance()\n",
    "        df = df.join(bureau_df,on='sk_id_curr',how='left')\n",
    "        print(\"Bureau dataframe shape: \", bureau_df.shape)\n",
    "        del bureau_df; gc.collect()\n",
    "    with timer(\"previous_application\"):\n",
    "        prev_df = process_previous_application()\n",
    "        df = df.join(prev_df,on='sk_id_curr',how='left')\n",
    "        print(\"Previous dataframe shape: \", prev_df.shape)\n",
    "        del prev_df; gc.collect()\n",
    "    with timer(\"pos_cash_balance\"):\n",
    "        pos_cash = process_pos_cash_balance()\n",
    "        df = df.join(pos_cash,on='sk_id_curr',how='left')\n",
    "        print(\"POS dataframe shape:\",pos_cash.shape)\n",
    "        del pos_cash;gc.collect()\n",
    "    with timer(\"installments_payment\"):\n",
    "        install_pay = process_payment()\n",
    "        df = df.join(install_pay,on='sk_id_curr',how='left')\n",
    "        print(\"Installment_payment shape :\",install_pay.shape)\n",
    "        del install_pay;gc.collect()\n",
    "    with timer(\"credit_card \"):\n",
    "        credit_card = process_credit_card()\n",
    "        df = df.join(credit_card,on='sk_id_curr',how='left')\n",
    "        print(\"Credit_card shape:\",credit_card.shape)\n",
    "        del credit_card;gc.collect()\n",
    "    return  df \n",
    " "
   ]
  }
 ],
 "metadata": {
  "hide_input": false,
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
